package com.shengzai.flink.source

import org.apache.flink.streaming.api.scala._

object Demo3SocketSource {
  def main(args: Array[String]): Unit = {

    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    // 无界流

    val socketDS: DataStream[String] = env.socketTextStream("master", 8888)

    val resDS: DataStream[(String, Int)] = socketDS
      .flatMap(_.split(","))
      .map((_, 1))
      .keyBy(_._1)
      .sum(1)

    resDS.print()
    env.execute()


  }

}
